Skip to content

Fix time segment pruning for remote clusters in multi-cluster routing#18855

Open
shauryachats wants to merge 3 commits into
apache:masterfrom
shauryachats:fix/time-segment-pruning-multi-cluster
Open

Fix time segment pruning for remote clusters in multi-cluster routing#18855
shauryachats wants to merge 3 commits into
apache:masterfrom
shauryachats:fix/time-segment-pruning-multi-cluster

Conversation

@shauryachats

Copy link
Copy Markdown
Collaborator

Motivation

When enableMultiClusterRouting=true, a broker (e.g. in DCA) builds a full routing table for each remote cluster (e.g.
PHX), including a TimeSegmentPruner per table. The pruner is supposed to skip segments whose time range does not
overlap the query's time filter — the same pruning that works correctly for local segments.

In practice, time pruning was silently broken for all remote segments. Every remote segment was mapped to
DEFAULT_INTERVAL = [0, Long.MAX_VALUE], which matches every query, so nothing was pruned.

On a federated table spanning two clusters in internal testing, numSegmentsQueried with enableMultiClusterRouting=true was 6× higher than two equivalent local-only queries combined (684 vs ~108).


Root cause

Two bugs in the routing update pipeline conspired to produce this outcome.

Bug 1 — TimeSegmentPruner.onAssignmentChange: computeIfAbsent prevented committed segments from being updated

TimeSegmentPruner maintains a map of segment → time interval. REALTIME segments first appear in CONSUMING state with startTime = -1, so they are mapped to DEFAULT_INTERVAL. When the segment later commits and ZooKeeper is updated with a valid startTime/endTime, the pruner should replace DEFAULT_INTERVAL with the real interval.

The old code used computeIfAbsent, which only writes if the key is absent. Since the CONSUMING entry already exists, the update is silently skipped. The segment stays at DEFAULT_INTERVAL permanently.

// Before
_intervalMap.computeIfAbsent(segment, k -> extractInterval(k, znRecord));

// After — re-evaluate any segment still at DEFAULT_INTERVAL
Interval existing = _intervalMap.get(segment);
if (existing == null || existing == DEFAULT_INTERVAL) {
    _intervalMap.put(segment, extractInterval(segment, znRecord));
}

Bug 2 — SegmentZkMetadataFetcher.onAssignmentChange: consuming segments were cached and never re-fetched

SegmentZkMetadataFetcher maintains _onlineSegmentsCached to avoid redundant ZK reads. Once a segment's ZNRecord is fetched, it is added to the cache and skipped on all subsequent onAssignmentChange calls.

The original caching condition was znRecord != null. A CONSUMING segment does have a non-null ZNRecord (it just has startTime = -1), so it was immediately cached. From that point on it was never re-fetched — even after it committed and ZK was updated with a valid time range. TimeSegmentPruner never received the committed ZNRecord, so Bug 1's fix never had a chance to run.

For local routing this is masked: when a server commits a segment, it sends a Helix user-defined message (UDM)
directly to the local broker, which calls refreshSegment() — bypassing the cache entirely. Remote brokers never receive
these UDMs (because remote brokers are spectators, not participants ), so onAssignmentChange is the sole update path for remote segments.


Fix

SegmentZkMetadataFetcher — use ExternalView state instead of caching consuming segments

Rather than caching a segment as soon as its ZNRecord is non-null, we consult the ExternalView that is already delivered
with every onAssignmentChange call:

for (String segment : onlineSegments) {
    if (_onlineSegmentsCached.contains(segment)) continue;  // committed, already cached
    if (isConsumingInExternalView(externalView, segment)) continue;  // still consuming, skip
    segments.add(segment);  // fetch from ZK
}

private static boolean isConsumingInExternalView(ExternalView externalView, String segment) {
    Map<String, String> stateMap = externalView.getStateMap(segment);
    return stateMap != null && stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
}

A segment goes through exactly three states in this logic:

State In cache? CONSUMING in EV? Action
CONSUMING No Yes Skip — nothing useful to fetch yet
Just committed (CONSUMING → ONLINE) No No Fetch once — get committed ZNRecord
Committed, previously seen Yes No Skip — already cached

The ExternalView change that fires onAssignmentChange is the same event that flips the segment's state from CONSUMING to ONLINE. We use that state, already in memory, to decide what to fetch — no additional ZK reads required to make the decision.

TimeSegmentPruner — re-evaluate segments at DEFAULT_INTERVAL

With the above fix ensuring the committed ZNRecord now flows through on each transition, the computeIfAbsent in
TimeSegmentPruner is replaced with a conditional put that updates any segment still at DEFAULT_INTERVAL:

Interval existing = _intervalMap.get(segment);
if (existing == null || existing == DEFAULT_INTERVAL) {
    _intervalMap.put(segment, extractInterval(segment, znRecord));
}

Why this does not degrade performance

onAssignmentChange is called on a background thread in response to Helix ExternalView changes — it is never on the
query path.

The original code fetched a segment's ZNRecord once (when first seen as CONSUMING), cached it, and skipped it forever.
The new code does the same for committed segments. The key difference is what happens to CONSUMING segments: instead of fetching and caching them eagerly, we skip them entirely using the ExternalView state already in memory. No ZK read is issued for a CONSUMING segment on any onAssignmentChange call.

The number of ZK reads per onAssignmentChange is now O(segments that just committed in this batch) — typically 1–5 per ExternalView change during active ingestion — compared to O(all currently-consuming segments) that a naive re-fetch approach would require. In steady state between commits, zero extra ZK reads are performed beyond what the original code did.

The IntervalTree rebuild in TimeSegmentPruner at the end of every onAssignmentChange is O(N log N) in total
segments and was already happening before this change — unaffected.


Verification

Tested on a staging cluster:

useMultiClusterRouting=false (local only):  queried=54   processed=30   ✓ (baseline, unchanged)
useMultiClusterRouting=true  (local + remote):   queried=108  processed=60   ✓ (was 684, now = 54+54)

numSegmentsQueried with multi-cluster routing now equals the sum of both clusters' local-only counts.
numSegmentsProcessed is unchanged — correct results, no data regression.

shauryachats and others added 2 commits June 25, 2026 16:37
When enableMultiClusterRouting=true, the local broker builds a TimeSegmentPruner
for each remote cluster. Two bugs caused all remote segments to be mapped to
DEFAULT_INTERVAL=[0,MAX], bypassing time pruning entirely and inflating
numSegmentsQueried (684 observed vs ~108 expected on a DCA+PHX federated table).

Bug 1 — TimeSegmentPruner.onAssignmentChange used computeIfAbsent:
REALTIME segments start CONSUMING with startTime=-1, so they map to
DEFAULT_INTERVAL. When they commit and get a valid startTime, computeIfAbsent
silently skips the update because the key already exists. Fix: replace with a
conditional put that re-evaluates any segment currently at DEFAULT_INTERVAL.

Bug 2 — SegmentZkMetadataFetcher cached consuming segments immediately:
The cache used znRecord != null as the caching condition. Consuming segments
have a non-null ZNRecord (just startTime=-1), so they were cached and never
re-fetched. For local routing this is masked by per-segment refreshSegment()
UDMs sent by servers on commit; for remote routing there is no UDM path so
onAssignmentChange() is the only update path — and it permanently skipped the
cached consuming segments.

Fix: skip segments whose ExternalView state is CONSUMING instead of caching
them. When a segment transitions CONSUMING->ONLINE (i.e. commits), the next
ExternalView change finds it uncached and non-consuming, fetches its committed
ZNRecord, and Bug 1's fix updates the interval. This is O(transitions) per
ExternalView change rather than O(consuming segments), avoiding redundant ZK
reads on clusters with many small tables.

Verified on live nodes: numSegmentsQueried with enableMultiClusterRouting=true
dropped from 684 to 108 (= 54+54, matching two local-only queries).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@codecov-commenter

codecov-commenter commented Jun 26, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 0% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 37.17%. Comparing base (dc4e957) to head (bb7dcef).
⚠️ Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
...ting/segmentmetadata/SegmentZkMetadataFetcher.java 0.00% 13 Missing ⚠️
...roker/routing/segmentpruner/TimeSegmentPruner.java 0.00% 3 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (dc4e957) and HEAD (bb7dcef). Click for more details.

HEAD has 4 uploads less than BASE
Flag BASE (dc4e957) HEAD (bb7dcef)
java-21 5 4
unittests1 1 0
unittests 2 1
temurin 5 4
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #18855       +/-   ##
=============================================
- Coverage     64.81%   37.17%   -27.65%     
+ Complexity     1322     1321        -1     
=============================================
  Files          3393     3393               
  Lines        211246   211315       +69     
  Branches      33208    33229       +21     
=============================================
- Hits         136917    78553    -58364     
- Misses        63284   125558    +62274     
+ Partials      11045     7204     -3841     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 37.17% <0.00%> (-27.65%) ⬇️
temurin 37.17% <0.00%> (-27.65%) ⬇️
unittests 37.17% <0.00%> (-27.65%) ⬇️
unittests1 ?
unittests2 37.17% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal issue; see inline comment.

}
for (int i = 0; i < numSegments; i++) {
if (znRecords.get(i) != null) {
if (!isConsumingInExternalView(externalView, segments.get(i))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This init-time cache update no longer checks whether the ZK fetch actually returned metadata. If an ONLINE segment comes back with a null ZNRecord here, TimeSegmentPruner.init() records DEFAULT_INTERVAL and this branch still marks the segment as cached, so later assignment changes will never retry the fetch. That regresses the old behavior and can leave the broker permanently unable to time-prune that segment until restart/refresh; please keep the original znRecords.get(i) != null guard in addition to the CONSUMING check before caching.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Just realised there is actually a deeper issue here beyond just null ZNRecords.

Even when the ZNRecord is non-null, there's a brief race window when a segment commits: the server updates the ExternalView to ONLINE before it finishes writing the committed ZNRecord to ZK. During this window, the segment is not CONSUMING in EV (so it passes the EV check), the ZNRecord exists but still has startTime = -1, and extractIntervalFromSegmentZKMetaZNRecord returns DEFAULT_INTERVAL. If cached at that moment, the segment is stuck with DEFAULT_INTERVAL forever since cached segments are never re-fetched.

Added an isCommittedZNRecord helper (znRecord != null && startTime >= 0) that handles both the null case, applied in both init() and onAssignmentChange().

… state

When a segment commits, the server updates ExternalView to ONLINE before
writing the ZNRecord to ZK. During SegmentZkMetadataFetcher.init(), if a
segment is ONLINE in EV but its ZNRecord still has startTime=-1 (brief
inconsistency window), the segment was cached immediately due to the
!isConsumingInExternalView check. Since cached segments are never re-fetched
in onAssignmentChange, the segment stayed at DEFAULT_INTERVAL permanently.

Fix: only cache a segment if its ZNRecord also has startTime >= 0 (committed).
Segments in the inconsistency window remain uncached and are re-evaluated on
the next onAssignmentChange once the ZNRecord is consistent.

This is the same check used for consuming segments but applied to caching
decisions rather than fetch-skip decisions, ensuring that EV state and ZNRecord
state are both valid before treating a segment as permanently resolved.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants